/**
 * GNU GENERAL PUBLIC LICENSE
 * Version 3, 29 June 2007
 * <p>
 * Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
 * Everyone is permitted to copy and distribute verbatim copies
 * of this license document, but changing it is not allowed.
 * <p>
 * Preamble
 * <p>
 * The GNU General Public License is a free, copyleft license for
 * software and other kinds of works.
 * <p>
 * The licenses for most software and other practical works are designed
 * to take away your freedom to share and change the works.  By contrast,
 * the GNU General Public License is intended to guarantee your freedom to
 * share and change all versions of a program--to make sure it remains free
 * software for all its users.  We, the Free Software Foundation, use the
 * GNU General Public License for most of our software; it applies also to
 * any other work released this way by its authors.  You can apply it to
 * your programs, too.
 * <p>
 * When we speak of free software, we are referring to freedom, not
 * price.  Our General Public Licenses are designed to make sure that you
 * have the freedom to distribute copies of free software (and charge for
 * them if you wish), that you receive source code or can get it if you
 * want it, that you can change the software or use pieces of it in new
 * free programs, and that you know you can do these things.
 * <p>
 * To protect your rights, we need to prevent others from denying you
 * these rights or asking you to surrender the rights.  Therefore, you have
 * certain responsibilities if you distribute copies of the software, or if
 * you modify it: responsibilities to respect the freedom of others.
 * <p>
 * For example, if you distribute copies of such a program, whether
 * gratis or for a fee, you must pass on to the recipients the same
 * freedoms that you received.  You must make sure that they, too, receive
 * or can get the source code.  And you must show them these terms so they
 * know their rights.
 * <p>
 * Developers that use the GNU GPL protect your rights with two steps:
 * (1) assert copyright on the software, and (2) offer you this License
 * giving you legal permission to copy, distribute and/or modify it.
 * <p>
 * For the developers' and authors' protection, the GPL clearly explains
 * that there is no warranty for this free software.  For both users' and
 * authors' sake, the GPL requires that modified versions be marked as
 * changed, so that their problems will not be attributed erroneously to
 * authors of previous versions.
 * <p>
 * Some devices are designed to deny users access to install or run
 * modified versions of the software inside them, although the manufacturer
 * can do so.  This is fundamentally incompatible with the aim of
 * protecting users' freedom to change the software.  The systematic
 * pattern of such abuse occurs in the area of products for individuals to
 * use, which is precisely where it is most unacceptable.  Therefore, we
 * have designed this version of the GPL to prohibit the practice for those
 * products.  If such problems arise substantially in other domains, we
 * stand ready to extend this provision to those domains in future versions
 * of the GPL, as needed to protect the freedom of users.
 * <p>
 * Finally, every program is threatened constantly by software patents.
 * States should not allow patents to restrict development and use of
 * software on general-purpose computers, but in those that do, we wish to
 * avoid the special danger that patents applied to a free program could
 * make it effectively proprietary.  To prevent this, the GPL assures that
 * patents cannot be used to render the program non-free.
 * <p>
 * The precise terms and conditions for copying, distribution and
 * modification follow.
 * <p>
 * TERMS AND CONDITIONS
 * <p>
 * 0. Definitions.
 * <p>
 * "This License" refers to version 3 of the GNU General Public License.
 * <p>
 * "Copyright" also means copyright-like laws that apply to other kinds of
 * works, such as semiconductor masks.
 * <p>
 * "The Program" refers to any copyrightable work licensed under this
 * License.  Each licensee is addressed as "you".  "Licensees" and
 * "recipients" may be individuals or organizations.
 * <p>
 * To "modify" a work means to copy from or adapt all or part of the work
 * in a fashion requiring copyright permission, other than the making of an
 * exact copy.  The resulting work is called a "modified version" of the
 * earlier work or a work "based on" the earlier work.
 * <p>
 * A "covered work" means either the unmodified Program or a work based
 * on the Program.
 * <p>
 * To "propagate" a work means to do anything with it that, without
 * permission, would make you directly or secondarily liable for
 * infringement under applicable copyright law, except executing it on a
 * computer or modifying a private copy.  Propagation includes copying,
 * distribution (with or without modification), making available to the
 * public, and in some countries other activities as well.
 * <p>
 * To "convey" a work means any kind of propagation that enables other
 * parties to make or receive copies.  Mere interaction with a user through
 * a computer network, with no transfer of a copy, is not conveying.
 * <p>
 * An interactive user interface displays "Appropriate Legal Notices"
 * to the extent that it includes a convenient and prominently visible
 * feature that (1) displays an appropriate copyright notice, and (2)
 * tells the user that there is no warranty for the work (except to the
 * extent that warranties are provided), that licensees may convey the
 * work under this License, and how to view a copy of this License.  If
 * the interface presents a list of user commands or options, such as a
 * menu, a prominent item in the list meets this criterion.
 * <p>
 * 1. Source Code.
 * <p>
 * The "source code" for a work means the preferred form of the work
 * for making modifications to it.  "Object code" means any non-source
 * form of a work.
 * <p>
 * A "Standard Interface" means an interface that either is an official
 * standard defined by a recognized standards body, or, in the case of
 * interfaces specified for a particular programming language, one that
 * is widely used among developers working in that language.
 * <p>
 * The "System Libraries" of an executable work include anything, other
 * than the work as a whole, that (a) is included in the normal form of
 * packaging a Major Component, but which is not part of that Major
 * Component, and (b) serves only to enable use of the work with that
 * Major Component, or to implement a Standard Interface for which an
 * implementation is available to the public in source code form.  A
 * "Major Component", in this context, means a major essential component
 * (kernel, window system, and so on) of the specific operating system
 * (if any) on which the executable work runs, or a compiler used to
 * produce the work, or an object code interpreter used to run it.
 * <p>
 * The "Corresponding Source" for a work in object code form means all
 * the source code needed to generate, install, and (for an executable
 * work) run the object code and to modify the work, including scripts to
 * control those activities.  However, it does not include the work's
 * System Libraries, or general-purpose tools or generally available free
 * programs which are used unmodified in performing those activities but
 * which are not part of the work.  For example, Corresponding Source
 * includes interface definition files associated with source files for
 * the work, and the source code for shared libraries and dynamically
 * linked subprograms that the work is specifically designed to require,
 * such as by intimate data communication or control flow between those
 * subprograms and other parts of the work.
 * <p>
 * The Corresponding Source need not include anything that users
 * can regenerate automatically from other parts of the Corresponding
 * Source.
 * <p>
 * The Corresponding Source for a work in source code form is that
 * same work.
 * <p>
 * 2. Basic Permissions.
 * <p>
 * All rights granted under this License are granted for the term of
 * copyright on the Program, and are irrevocable provided the stated
 * conditions are met.  This License explicitly affirms your unlimited
 * permission to run the unmodified Program.  The output from running a
 * covered work is covered by this License only if the output, given its
 * content, constitutes a covered work.  This License acknowledges your
 * rights of fair use or other equivalent, as provided by copyright law.
 * <p>
 * You may make, run and propagate covered works that you do not
 * convey, without conditions so long as your license otherwise remains
 * in force.  You may convey covered works to others for the sole purpose
 * of having them make modifications exclusively for you, or provide you
 * with facilities for running those works, provided that you comply with
 * the terms of this License in conveying all material for which you do
 * not control copyright.  Those thus making or running the covered works
 * for you must do so exclusively on your behalf, under your direction
 * and control, on terms that prohibit them from making any copies of
 * your copyrighted material outside their relationship with you.
 * <p>
 * Conveying under any other circumstances is permitted solely under
 * the conditions stated below.  Sublicensing is not allowed; section 10
 * makes it unnecessary.
 * <p>
 * 3. Protecting Users' Legal Rights From Anti-Circumvention Law.
 * <p>
 * No covered work shall be deemed part of an effective technological
 * measure under any applicable law fulfilling obligations under article
 * 11 of the WIPO copyright treaty adopted on 20 December 1996, or
 * similar laws prohibiting or restricting circumvention of such
 * measures.
 * <p>
 * When you convey a covered work, you waive any legal power to forbid
 * circumvention of technological measures to the extent such circumvention
 * is effected by exercising rights under this License with respect to
 * the covered work, and you disclaim any intention to limit operation or
 * modification of the work as a means of enforcing, against the work's
 * users, your or third parties' legal rights to forbid circumvention of
 * technological measures.
 * <p>
 * 4. Conveying Verbatim Copies.
 * <p>
 * You may convey verbatim copies of the Program's source code as you
 * receive it, in any medium, provided that you conspicuously and
 * appropriately publish on each copy an appropriate copyright notice;
 * keep intact all notices stating that this License and any
 * non-permissive terms added in accord with section 7 apply to the code;
 * keep intact all notices of the absence of any warranty; and give all
 * recipients a copy of this License along with the Program.
 * <p>
 * You may charge any price or no price for each copy that you convey,
 * and you may offer support or warranty protection for a fee.
 * <p>
 * 5. Conveying Modified Source Versions.
 * <p>
 * You may convey a work based on the Program, or the modifications to
 * produce it from the Program, in the form of source code under the
 * terms of section 4, provided that you also meet all of these conditions:
 * <p>
 * a) The work must carry prominent notices stating that you modified
 * it, and giving a relevant date.
 * <p>
 * b) The work must carry prominent notices stating that it is
 * released under this License and any conditions added under section
 * 7.  This requirement modifies the requirement in section 4 to
 * "keep intact all notices".
 * <p>
 * c) You must license the entire work, as a whole, under this
 * License to anyone who comes into possession of a copy.  This
 * License will therefore apply, along with any applicable section 7
 * additional terms, to the whole of the work, and all its parts,
 * regardless of how they are packaged.  This License gives no
 * permission to license the work in any other way, but it does not
 * invalidate such permission if you have separately received it.
 * <p>
 * d) If the work has interactive user interfaces, each must display
 * Appropriate Legal Notices; however, if the Program has interactive
 * interfaces that do not display Appropriate Legal Notices, your
 * work need not make them do so.
 * <p>
 * A compilation of a covered work with other separate and independent
 * works, which are not by their nature extensions of the covered work,
 * and which are not combined with it such as to form a larger program,
 * in or on a volume of a storage or distribution medium, is called an
 * "aggregate" if the compilation and its resulting copyright are not
 * used to limit the access or legal rights of the compilation's users
 * beyond what the individual works permit.  Inclusion of a covered work
 * in an aggregate does not cause this License to apply to the other
 * parts of the aggregate.
 * <p>
 * 6. Conveying Non-Source Forms.
 * <p>
 * You may convey a covered work in object code form under the terms
 * of sections 4 and 5, provided that you also convey the
 * machine-readable Corresponding Source under the terms of this License,
 * in one of these ways:
 * <p>
 * a) Convey the object code in, or embodied in, a physical product
 * (including a physical distribution medium), accompanied by the
 * Corresponding Source fixed on a durable physical medium
 * customarily used for software interchange.
 * <p>
 * b) Convey the object code in, or embodied in, a physical product
 * (including a physical distribution medium), accompanied by a
 * written offer, valid for at least three years and valid for as
 * long as you offer spare parts or customer support for that product
 * model, to give anyone who possesses the object code either (1) a
 * copy of the Corresponding Source for all the software in the
 * product that is covered by this License, on a durable physical
 * medium customarily used for software interchange, for a price no
 * more than your reasonable cost of physically performing this
 * conveying of source, or (2) access to copy the
 * Corresponding Source from a network server at no charge.
 * <p>
 * c) Convey individual copies of the object code with a copy of the
 * written offer to provide the Corresponding Source.  This
 * alternative is allowed only occasionally and noncommercially, and
 * only if you received the object code with such an offer, in accord
 * with subsection 6b.
 * <p>
 * d) Convey the object code by offering access from a designated
 * place (gratis or for a charge), and offer equivalent access to the
 * Corresponding Source in the same way through the same place at no
 * further charge.  You need not require recipients to copy the
 * Corresponding Source along with the object code.  If the place to
 * copy the object code is a network server, the Corresponding Source
 * may be on a different server (operated by you or a third party)
 * that supports equivalent copying facilities, provided you maintain
 * clear directions next to the object code saying where to find the
 * Corresponding Source.  Regardless of what server hosts the
 * Corresponding Source, you remain obligated to ensure that it is
 * available for as long as needed to satisfy these requirements.
 * <p>
 * e) Convey the object code using peer-to-peer transmission, provided
 * you inform other peers where the object code and Corresponding
 * Source of the work are being offered to the general public at no
 * charge under subsection 6d.
 * <p>
 * A separable portion of the object code, whose source code is excluded
 * from the Corresponding Source as a System Library, need not be
 * included in conveying the object code work.
 * <p>
 * A "User Product" is either (1) a "consumer product", which means any
 * tangible personal property which is normally used for personal, family,
 * or household purposes, or (2) anything designed or sold for incorporation
 * into a dwelling.  In determining whether a product is a consumer product,
 * doubtful cases shall be resolved in favor of coverage.  For a particular
 * product received by a particular user, "normally used" refers to a
 * typical or common use of that class of product, regardless of the status
 * of the particular user or of the way in which the particular user
 * actually uses, or expects or is expected to use, the product.  A product
 * is a consumer product regardless of whether the product has substantial
 * commercial, industrial or non-consumer uses, unless such uses represent
 * the only significant mode of use of the product.
 * <p>
 * "Installation Information" for a User Product means any methods,
 * procedures, authorization keys, or other information required to install
 * and execute modified versions of a covered work in that User Product from
 * a modified version of its Corresponding Source.  The information must
 * suffice to ensure that the continued functioning of the modified object
 * code is in no case prevented or interfered with solely because
 * modification has been made.
 * <p>
 * If you convey an object code work under this section in, or with, or
 * specifically for use in, a User Product, and the conveying occurs as
 * part of a transaction in which the right of possession and use of the
 * User Product is transferred to the recipient in perpetuity or for a
 * fixed term (regardless of how the transaction is characterized), the
 * Corresponding Source conveyed under this section must be accompanied
 * by the Installation Information.  But this requirement does not apply
 * if neither you nor any third party retains the ability to install
 * modified object code on the User Product (for example, the work has
 * been installed in ROM).
 * <p>
 * The requirement to provide Installation Information does not include a
 * requirement to continue to provide support service, warranty, or updates
 * for a work that has been modified or installed by the recipient, or for
 * the User Product in which it has been modified or installed.  Access to a
 * network may be denied when the modification itself materially and
 * adversely affects the operation of the network or violates the rules and
 * protocols for communication across the network.
 * <p>
 * Corresponding Source conveyed, and Installation Information provided,
 * in accord with this section must be in a format that is publicly
 * documented (and with an implementation available to the public in
 * source code form), and must require no special password or key for
 * unpacking, reading or copying.
 * <p>
 * 7. Additional Terms.
 * <p>
 * "Additional permissions" are terms that supplement the terms of this
 * License by making exceptions from one or more of its conditions.
 * Additional permissions that are applicable to the entire Program shall
 * be treated as though they were included in this License, to the extent
 * that they are valid under applicable law.  If additional permissions
 * apply only to part of the Program, that part may be used separately
 * under those permissions, but the entire Program remains governed by
 * this License without regard to the additional permissions.
 * <p>
 * When you convey a copy of a covered work, you may at your option
 * remove any additional permissions from that copy, or from any part of
 * it.  (Additional permissions may be written to require their own
 * removal in certain cases when you modify the work.)  You may place
 * additional permissions on material, added by you to a covered work,
 * for which you have or can give appropriate copyright permission.
 * <p>
 * Notwithstanding any other provision of this License, for material you
 * add to a covered work, you may (if authorized by the copyright holders of
 * that material) supplement the terms of this License with terms:
 * <p>
 * a) Disclaiming warranty or limiting liability differently from the
 * terms of sections 15 and 16 of this License; or
 * <p>
 * b) Requiring preservation of specified reasonable legal notices or
 * author attributions in that material or in the Appropriate Legal
 * Notices displayed by works containing it; or
 * <p>
 * c) Prohibiting misrepresentation of the origin of that material, or
 * requiring that modified versions of such material be marked in
 * reasonable ways as different from the original version; or
 * <p>
 * d) Limiting the use for publicity purposes of names of licensors or
 * authors of the material; or
 * <p>
 * e) Declining to grant rights under trademark law for use of some
 * trade names, trademarks, or service marks; or
 * <p>
 * f) Requiring indemnification of licensors and authors of that
 * material by anyone who conveys the material (or modified versions of
 * it) with contractual assumptions of liability to the recipient, for
 * any liability that these contractual assumptions directly impose on
 * those licensors and authors.
 * <p>
 * All other non-permissive additional terms are considered "further
 * restrictions" within the meaning of section 10.  If the Program as you
 * received it, or any part of it, contains a notice stating that it is
 * governed by this License along with a term that is a further
 * restriction, you may remove that term.  If a license document contains
 * a further restriction but permits relicensing or conveying under this
 * License, you may add to a covered work material governed by the terms
 * of that license document, provided that the further restriction does
 * not survive such relicensing or conveying.
 * <p>
 * If you add terms to a covered work in accord with this section, you
 * must place, in the relevant source files, a statement of the
 * additional terms that apply to those files, or a notice indicating
 * where to find the applicable terms.
 * <p>
 * Additional terms, permissive or non-permissive, may be stated in the
 * form of a separately written license, or stated as exceptions;
 * the above requirements apply either way.
 * <p>
 * 8. Termination.
 * <p>
 * You may not propagate or modify a covered work except as expressly
 * provided under this License.  Any attempt otherwise to propagate or
 * modify it is void, and will automatically terminate your rights under
 * this License (including any patent licenses granted under the third
 * paragraph of section 11).
 * <p>
 * However, if you cease all violation of this License, then your
 * license from a particular copyright holder is reinstated (a)
 * provisionally, unless and until the copyright holder explicitly and
 * finally terminates your license, and (b) permanently, if the copyright
 * holder fails to notify you of the violation by some reasonable means
 * prior to 60 days after the cessation.
 * <p>
 * Moreover, your license from a particular copyright holder is
 * reinstated permanently if the copyright holder notifies you of the
 * violation by some reasonable means, this is the first time you have
 * received notice of violation of this License (for any work) from that
 * copyright holder, and you cure the violation prior to 30 days after
 * your receipt of the notice.
 * <p>
 * Termination of your rights under this section does not terminate the
 * licenses of parties who have received copies or rights from you under
 * this License.  If your rights have been terminated and not permanently
 * reinstated, you do not qualify to receive new licenses for the same
 * material under section 10.
 * <p>
 * 9. Acceptance Not Required for Having Copies.
 * <p>
 * You are not required to accept this License in order to receive or
 * run a copy of the Program.  Ancillary propagation of a covered work
 * occurring solely as a consequence of using peer-to-peer transmission
 * to receive a copy likewise does not require acceptance.  However,
 * nothing other than this License grants you permission to propagate or
 * modify any covered work.  These actions infringe copyright if you do
 * not accept this License.  Therefore, by modifying or propagating a
 * covered work, you indicate your acceptance of this License to do so.
 * <p>
 * 10. Automatic Licensing of Downstream Recipients.
 * <p>
 * Each time you convey a covered work, the recipient automatically
 * receives a license from the original licensors, to run, modify and
 * propagate that work, subject to this License.  You are not responsible
 * for enforcing compliance by third parties with this License.
 * <p>
 * An "entity transaction" is a transaction transferring control of an
 * organization, or substantially all assets of one, or subdividing an
 * organization, or merging organizations.  If propagation of a covered
 * work results from an entity transaction, each party to that
 * transaction who receives a copy of the work also receives whatever
 * licenses to the work the party's predecessor in interest had or could
 * give under the previous paragraph, plus a right to possession of the
 * Corresponding Source of the work from the predecessor in interest, if
 * the predecessor has it or can get it with reasonable efforts.
 * <p>
 * You may not impose any further restrictions on the exercise of the
 * rights granted or affirmed under this License.  For example, you may
 * not impose a license fee, royalty, or other charge for exercise of
 * rights granted under this License, and you may not initiate litigation
 * (including a cross-claim or counterclaim in a lawsuit) alleging that
 * any patent claim is infringed by making, using, selling, offering for
 * sale, or importing the Program or any portion of it.
 * <p>
 * 11. Patents.
 * <p>
 * A "contributor" is a copyright holder who authorizes use under this
 * License of the Program or a work on which the Program is based.  The
 * work thus licensed is called the contributor's "contributor version".
 * <p>
 * A contributor's "essential patent claims" are all patent claims
 * owned or controlled by the contributor, whether already acquired or
 * hereafter acquired, that would be infringed by some manner, permitted
 * by this License, of making, using, or selling its contributor version,
 * but do not include claims that would be infringed only as a
 * consequence of further modification of the contributor version.  For
 * purposes of this definition, "control" includes the right to grant
 * patent sublicenses in a manner consistent with the requirements of
 * this License.
 * <p>
 * Each contributor grants you a non-exclusive, worldwide, royalty-free
 * patent license under the contributor's essential patent claims, to
 * make, use, sell, offer for sale, import and otherwise run, modify and
 * propagate the contents of its contributor version.
 * <p>
 * In the following three paragraphs, a "patent license" is any express
 * agreement or commitment, however denominated, not to enforce a patent
 * (such as an express permission to practice a patent or covenant not to
 * sue for patent infringement).  To "grant" such a patent license to a
 * party means to make such an agreement or commitment not to enforce a
 * patent against the party.
 * <p>
 * If you convey a covered work, knowingly relying on a patent license,
 * and the Corresponding Source of the work is not available for anyone
 * to copy, free of charge and under the terms of this License, through a
 * publicly available network server or other readily accessible means,
 * then you must either (1) cause the Corresponding Source to be so
 * available, or (2) arrange to deprive yourself of the benefit of the
 * patent license for this particular work, or (3) arrange, in a manner
 * consistent with the requirements of this License, to extend the patent
 * license to downstream recipients.  "Knowingly relying" means you have
 * actual knowledge that, but for the patent license, your conveying the
 * covered work in a country, or your recipient's use of the covered work
 * in a country, would infringe one or more identifiable patents in that
 * country that you have reason to believe are valid.
 * <p>
 * If, pursuant to or in connection with a single transaction or
 * arrangement, you convey, or propagate by procuring conveyance of, a
 * covered work, and grant a patent license to some of the parties
 * receiving the covered work authorizing them to use, propagate, modify
 * or convey a specific copy of the covered work, then the patent license
 * you grant is automatically extended to all recipients of the covered
 * work and works based on it.
 * <p>
 * A patent license is "discriminatory" if it does not include within
 * the scope of its coverage, prohibits the exercise of, or is
 * conditioned on the non-exercise of one or more of the rights that are
 * specifically granted under this License.  You may not convey a covered
 * work if you are a party to an arrangement with a third party that is
 * in the business of distributing software, under which you make payment
 * to the third party based on the extent of your activity of conveying
 * the work, and under which the third party grants, to any of the
 * parties who would receive the covered work from you, a discriminatory
 * patent license (a) in connection with copies of the covered work
 * conveyed by you (or copies made from those copies), or (b) primarily
 * for and in connection with specific products or compilations that
 * contain the covered work, unless you entered into that arrangement,
 * or that patent license was granted, prior to 28 March 2007.
 * <p>
 * Nothing in this License shall be construed as excluding or limiting
 * any implied license or other defenses to infringement that may
 * otherwise be available to you under applicable patent law.
 * <p>
 * 12. No Surrender of Others' Freedom.
 * <p>
 * If conditions are imposed on you (whether by court order, agreement or
 * otherwise) that contradict the conditions of this License, they do not
 * excuse you from the conditions of this License.  If you cannot convey a
 * covered work so as to satisfy simultaneously your obligations under this
 * License and any other pertinent obligations, then as a consequence you may
 * not convey it at all.  For example, if you agree to terms that obligate you
 * to collect a royalty for further conveying from those to whom you convey
 * the Program, the only way you could satisfy both those terms and this
 * License would be to refrain entirely from conveying the Program.
 * <p>
 * 13. Use with the GNU Affero General Public License.
 * <p>
 * Notwithstanding any other provision of this License, you have
 * permission to link or combine any covered work with a work licensed
 * under version 3 of the GNU Affero General Public License into a single
 * combined work, and to convey the resulting work.  The terms of this
 * License will continue to apply to the part which is the covered work,
 * but the special requirements of the GNU Affero General Public License,
 * section 13, concerning interaction through a network will apply to the
 * combination as such.
 * <p>
 * 14. Revised Versions of this License.
 * <p>
 * The Free Software Foundation may publish revised and/or new versions of
 * the GNU General Public License from time to time.  Such new versions will
 * be similar in spirit to the present version, but may differ in detail to
 * address new problems or concerns.
 * <p>
 * Each version is given a distinguishing version number.  If the
 * Program specifies that a certain numbered version of the GNU General
 * Public License "or any later version" applies to it, you have the
 * option of following the terms and conditions either of that numbered
 * version or of any later version published by the Free Software
 * Foundation.  If the Program does not specify a version number of the
 * GNU General Public License, you may choose any version ever published
 * by the Free Software Foundation.
 * <p>
 * If the Program specifies that a proxy can decide which future
 * versions of the GNU General Public License can be used, that proxy's
 * public statement of acceptance of a version permanently authorizes you
 * to choose that version for the Program.
 * <p>
 * Later license versions may give you additional or different
 * permissions.  However, no additional obligations are imposed on any
 * author or copyright holder as a result of your choosing to follow a
 * later version.
 * <p>
 * 15. Disclaimer of Warranty.
 * <p>
 * THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
 * APPLICABLE LAW.  EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
 * HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
 * OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE.  THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
 * IS WITH YOU.  SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
 * ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
 * <p>
 * 16. Limitation of Liability.
 * <p>
 * IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
 * WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
 * THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
 * GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
 * USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
 * DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
 * PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
 * EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGES.
 * <p>
 * 17. Interpretation of Sections 15 and 16.
 * <p>
 * If the disclaimer of warranty and limitation of liability provided
 * above cannot be given local legal effect according to their terms,
 * reviewing courts shall apply local law that most closely approximates
 * an absolute waiver of all civil liability in connection with the
 * Program, unless a warranty or assumption of liability accompanies a
 * copy of the Program in return for a fee.
 * <p>
 * END OF TERMS AND CONDITIONS
 * <p>
 * How to Apply These Terms to Your New Programs
 * <p>
 * If you develop a new program, and you want it to be of the greatest
 * possible use to the public, the best way to achieve this is to make it
 * free software which everyone can redistribute and change under these terms.
 * <p>
 * To do so, attach the following notices to the program.  It is safest
 * to attach them to the start of each source file to most effectively
 * state the exclusion of warranty; and each file should have at least
 * the "copyright" line and a pointer to where the full notice is found.
 * <p>
 * mydataharbor是一个致力于解决异构数据源之间数据同步的中间件
 * Copyright (C) 2020  xulang<1053618636@qq.com>
 * <p>
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * <p>
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * <p>
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 * <p>
 * Also add information on how to contact you by electronic and paper mail.
 * <p>
 * If the program does terminal interaction, make it output a short
 * notice like this when it starts in an interactive mode:
 * <p>
 * mydataharbor  Copyright (C) 2020  xulang<1053618636@qq.com>
 * This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
 * This is free software, and you are welcome to redistribute it
 * under certain conditions; type `show c' for details.
 * <p>
 * The hypothetical commands `show w' and `show c' should show the appropriate
 * parts of the General Public License.  Of course, your program's commands
 * might be different; for a GUI interface, you would use an "about box".
 * <p>
 * You should also get your employer (if you work as a programmer) or school,
 * if any, to sign a "copyright disclaimer" for the program, if necessary.
 * For more information on this, and how to apply and follow the GNU GPL, see
 * <https://www.gnu.org/licenses/>.
 * <p>
 * The GNU General Public License does not permit incorporating your program
 * into proprietary programs.  If your program is a subroutine library, you
 * may consider it more useful to permit linking proprietary applications with
 * the library.  If this is what you want to do, use the GNU Lesser General
 * Public License instead of this License.  But first, please read
 * <https://www.gnu.org/licenses/why-not-lgpl.html>.
 */


package mydataharbor.executor;

import lombok.extern.slf4j.Slf4j;
import mydataharbor.AbstractDataChecker;
import mydataharbor.ErrorRecord;
import mydataharbor.IDataConverter;
import mydataharbor.IDataPipeline;
import mydataharbor.IDataSink;
import mydataharbor.IDataSource;
import mydataharbor.IExecutorListener;
import mydataharbor.IProtocolData;
import mydataharbor.IProtocolDataChecker;
import mydataharbor.IProtocolDataConverter;
import mydataharbor.ISafeRun;
import mydataharbor.ITaskStorage;
import mydataharbor.exception.ResetException;
import mydataharbor.exception.TheEndException;
import mydataharbor.monitor.TaskExecutorMonitor;
import mydataharbor.setting.BaseSettingContext;
import mydataharbor.threadlocal.TaskStorageThreadLocal;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * @param <T> 原始介质数据
 * @param <P> 平台协议数据
 * @param <R> writer数据
 * @param <S> 配置上下文
 * @auth xulang
 * @Date 2021/4/30
 **/
@Slf4j
public abstract class AbstractDataExecutor<T, P extends IProtocolData, R, S extends BaseSettingContext> extends Thread implements Closeable {

  private IDataPipeline<T, P, R, S> dataPipeline;

  private S settingContext;

  /**
   * 是否结束
   */
  private volatile boolean run = true;

  /**
   * 是否暂停
   */
  private volatile boolean suspend = false;

  /**
   * 线程是否结束
   */
  private volatile boolean end = true;

  /**
   * 处理计数器
   */
  private volatile AtomicLong writeCount = new AtomicLong();

  private List<IExecutorListener> executorListeners = new CopyOnWriteArrayList<>();

  /**
   * 并行处理线程池
   */
  protected ForkJoinPool forkJoinPool;

  private Map<Object, Boolean> rollbackUnit;

  private TaskExecutorMonitor taskMonitor;

  private ITaskStorage taskStorage;

  public AbstractDataExecutor(IDataPipeline<T, P, R, S> dataPipeline, String threadName) {
    this.dataPipeline = dataPipeline;
    this.settingContext = dataPipeline.settingContext();
    setName(threadName);
  }

  private void safeListenerRun(ISafeRun run) {
    try {
      run.run();
    } catch (Throwable throwable) {
      log.error("通知listener时发生异常", throwable);
    }
  }

  public void addListener(IExecutorListener executorListener) {
    executorListeners.add(executorListener);
  }

  public IDataPipeline<T, P, R, S> getDataPipeline() {
    return dataPipeline;
  }

  /**
   * 数据产生处
   */
  @Override
  public void run() {
      TaskStorageThreadLocal.set(taskStorage);
    end = false;
    taskMonitor.setEnd(end);
    if (!suspend)
      safeListenerRun(() -> executorListeners.stream().forEach(listener -> listener.onRun(this, dataPipeline)));
    IDataSource<T, S> dataSource = dataPipeline.dataSource();
    dataSource.init(settingContext);
    IProtocolDataConverter<T, P, S> protocolDataConverter = dataPipeline.protocolDataConverter();
    protocolDataConverter.init(settingContext);
    IProtocolDataChecker<P, S> checker = dataPipeline.checker();
    if(checker!=null)
        checker.init(settingContext);
    IDataConverter<P, R, S> dataConverter = dataPipeline.dataConverter();
    dataConverter.init(settingContext);
    IDataSink<R, S> sink = dataPipeline.sink();
    sink.init(settingContext);
    try {
      taskMonitor.setTotal(dataSource.total());
      while (run) {
        while (suspend) {
          if (!run) {
            //允许暂停时被结束
            break;
          }
          taskMonitor.setLastRunTime(System.currentTimeMillis());
          try {
            Thread.sleep(500);
          } catch (InterruptedException e) {
            log.error("暂停被打断");
          }
        }
        if (!run) {
          break;
        }
        long startTime = System.currentTimeMillis();
        try {
            Boolean isWrite = doRun(dataSource, protocolDataConverter, checker, dataConverter, sink);
            if(isWrite)
                taskMonitor.useTimeIncrease(System.currentTimeMillis() - startTime);
        } catch (TheEndException e) {
          //数据拉取完毕
          //跳出循环
          break;
        } finally {
          taskMonitor.setLastRunTime(System.currentTimeMillis());
        }
        if (settingContext.getSleepTime() != 0L) {
          try {
            Thread.sleep(settingContext.getSleepTime());
          } catch (InterruptedException e) {
            log.error("睡眠被打断", e);
          }
        }
      }
      safeListenerRun(() -> executorListeners.stream().forEach(listener -> listener.onSucccessEnd(this, dataPipeline, writeCount.longValue(), run)));
    } catch (Throwable e) {
      log.error("发生未知异常任务线程异常退出", e);
      safeListenerRun(() -> executorListeners.stream().forEach(listener -> listener.onExceptionEnd(this, dataPipeline, e, writeCount.longValue())));
    } finally {
      end = true;
      taskMonitor.setEnd(end);
      if (run)
        safeListenerRun(this::close);
      if (forkJoinPool != null) {
        forkJoinPool.shutdown();
      }
      log.info("{}该线程结束！", getName());
    }

  }

  private Boolean doRun(IDataSource<T, S> dataProvider, IProtocolDataConverter<T, P, S> protocolDataConverter, IProtocolDataChecker checker, IDataConverter<P, R, S> dataConverter, IDataSink<R, S> writer) throws TheEndException {
    //协议转换通过的数据
    List<P> protocolConvertSuccess = Collections.synchronizedList(new ArrayList<>());
    //协议转换失败的数据
    List<ErrorRecord<T, Object>> protocolConvertError = Collections.synchronizedList(new ArrayList<>());
    //检查通过的数据
    List<P> checkerSuccess = Collections.synchronizedList(new ArrayList<>());
    //检查失败的数据
    List<ErrorRecord<P, AbstractDataChecker.CheckResult>> checkerError = Collections.synchronizedList(new ArrayList<>());
    //转换通过的数据
    List<R> dataConvertSuccess = Collections.synchronizedList(new ArrayList<>());
    //转换失败的数据
    List<ErrorRecord<P, Object>> dataConvertError = Collections.synchronizedList(new ArrayList<>());
    //写入成功
    List<R> writeSuccess = Collections.synchronizedList(new ArrayList<>());
    //写入失败
    List<ErrorRecord<R, IDataSink.WriterResult>> writeError = Collections.synchronizedList(new ArrayList<>());
    //进入write流程的原始记录
    List<T> tRecordConvertSuccess = Collections.synchronizedList(new ArrayList<>());
    Iterable<T> tRecordsIterable;
    Long tRecordUseTimeStart = System.currentTimeMillis();
      try {
          tRecordsIterable = dataPipeline.dataSource().poll(settingContext);
      } catch (TheEndException e){
          throw e;
      } catch (Exception e){
          taskMonitor.addAndGettPollErrorCount(1L);
          log.error("poll发生异常",e);
          return false;
      }
    boolean empty = !tRecordsIterable.iterator().hasNext();
    if (empty) {
      return false;
    }
    taskMonitor.tRecordUseTimeIncrease(System.currentTimeMillis() - tRecordUseTimeStart);
    rollbackUnit = new ConcurrentHashMap<>();
    log.info("原始数据源数据：{}", tRecordsIterable);
    if (tRecordsIterable instanceof Collection) {
      taskMonitor.addAndGettRecordCount((long) ((Collection) tRecordsIterable).size());
    } else {
      tRecordsIterable.forEach((record) -> taskMonitor.addAndGettRecordCount(1L));
    }
    Stream<T> stream = StreamSupport.stream(tRecordsIterable.spliterator(), settingContext.isParallel());
    //开启了并行，并且制定了线程数
    if (settingContext.isParallel() && settingContext.getThreadNum() != 0) {
      if (forkJoinPool == null)
        forkJoinPool = new ForkJoinPool(settingContext.getThreadNum());
      try {
        forkJoinPool.submit(() -> {
          forEach(stream, dataProvider, protocolDataConverter, checker, dataConverter, writer, protocolConvertSuccess, protocolConvertError, checkerSuccess, checkerError, dataConvertSuccess, dataConvertError, writeSuccess, writeError, tRecordConvertSuccess);
        }).get();
      } catch (InterruptedException | ExecutionException e) {
        log.error("并行执行任务发生异常！", e);
      }
    } else {
      forEach(stream, dataProvider, protocolDataConverter, checker, dataConverter, writer, protocolConvertSuccess, protocolConvertError, checkerSuccess, checkerError, dataConvertSuccess, dataConvertError, writeSuccess, writeError, tRecordConvertSuccess);
    }
    //日志记录
    log.info("协议转换通过记录:{}", protocolConvertSuccess);
    taskMonitor.addAndGetProtocolConvertSuccessCount((long) protocolConvertSuccess.size());
    if (!protocolConvertError.isEmpty()) {
      log.info("协议转换失败记录:{}", protocolConvertError);
      taskMonitor.addAndGetProtocolConvertErrorCount((long) protocolConvertError.size());
    }
    log.info("检查通过记录:{}", checkerSuccess);
    taskMonitor.addAndGetCheckerSuccessCount((long) checkerSuccess.size());
    if (!checkerError.isEmpty()) {
      log.info("检查失败记录:{}", checkerError);
      taskMonitor.addAndGetCheckerErrorCount((long) checkerError.size());
    }
    log.info("数据转换通过记录:{}", dataConvertSuccess);
    taskMonitor.addAndGetDataConvertSuccessCount((long) dataConvertSuccess.size());
    if (!dataConvertError.isEmpty()) {
      log.info("数据转换失败记录:{}", dataConvertError);
      taskMonitor.addAndGetDataConvertErrorCount((long) dataConvertError.size());

    }
    //检查错误列表里是否有reset异常，如果有则放弃此次写入和提交
    if (isContainRestException(protocolConvertError, checkerError, dataConvertError)) {
      log.error("写入前流程发生reset异常！");
      tRecordsIterable.forEach(record -> rollbackUnit.put(dataProvider.rollbackTransactionUnit(record), true));
      dataProvider.rollback(tRecordsIterable, settingContext);
      return false;
    }

    if (tRecordConvertSuccess.size() == 0) {
      //数据全部转换失败，并且无需回滚
      dataProvider.commit(tRecordsIterable, settingContext);
    } else {
      if ((tRecordsIterable instanceof Collection) && ((Collection<T>) tRecordsIterable).size() != tRecordConvertSuccess.size()) {
        //无法写入的数据先提交掉
        ArrayList<T> tobeCommit = new ArrayList<>((Collection<? extends T>) tRecordsIterable);
        tobeCommit.removeAll(tRecordConvertSuccess);
        dataProvider.commit(tobeCommit, settingContext);
      }
      //批量数据写入
      if (settingContext.isBatchWrite()) {
        long writeUseTimeStart = System.currentTimeMillis();
        batchWrite(dataProvider, writer, dataConvertSuccess, writeSuccess, writeError, tRecordConvertSuccess);
        taskMonitor.writeUseTimeIncrease(System.currentTimeMillis() - writeUseTimeStart);
      } else if (settingContext.isBatchCommit()) {
        //单条写入，批量提交
        dataProvider.commit(tRecordConvertSuccess, settingContext);
      }
    }
    log.info("写入成功记录:{}", writeSuccess);
    writeCount.addAndGet(writeSuccess.size());
    taskMonitor.addAndGetWriteSuccessCount((long) writeSuccess.size());
    if (!writeError.isEmpty()) {
      log.info("写入失败记录:{}", writeError);
      taskMonitor.addAndGetWriteErrorCount((long) writeError.size());
    }
    return true;
  }

  /**
   * 暂停
   */
  public void pause() {
    this.suspend = true;
    taskMonitor.setSuspend(suspend);
    safeListenerRun(() -> executorListeners.stream().forEach(listener -> listener.onSuspend(this, dataPipeline, writeCount.longValue())));
  }

  /**
   * 继续
   */
  public void doContinue() {
    this.suspend = false;
    taskMonitor.setSuspend(suspend);
    safeListenerRun(() -> executorListeners.stream().forEach(listener -> listener.onContinue(this, dataPipeline, writeCount.longValue())));
  }

  protected void forEach(Stream<T> stream, IDataSource<T, S> dataProvider, IProtocolDataConverter<T, P, S> protocolDataConverter, IProtocolDataChecker checker, IDataConverter<P, R, S> dataConverter, IDataSink<R, S> writer, List<P> protocolConvertSuccess, List<ErrorRecord<T, Object>> protocolConvertError, List<P> checkerSuccess, List<ErrorRecord<P, IProtocolDataChecker.CheckResult>> checkerError, List<R> dataConvertSuccess, List<ErrorRecord<P, Object>> dataConvertError, List<R> writeSuccess, List<ErrorRecord<R, IDataSink.WriterResult>> writeError, List<T> tRecordConvertSucces) {
    stream.forEach(tRecord -> {
        TaskStorageThreadLocal.set(taskStorage);
      Object rollbackTransactionUnit = dataProvider.rollbackTransactionUnit(tRecord);
      if (!rollbackUnit.getOrDefault(rollbackTransactionUnit, false) || settingContext.isContinueOnRollbackOccurContinueInOncePoll())
        doForEach(dataProvider, protocolDataConverter, checker, dataConverter, writer, protocolConvertSuccess, protocolConvertError, checkerSuccess, checkerError, dataConvertSuccess, dataConvertError, writeSuccess, writeError, tRecordConvertSucces, tRecord);
    });
  }


  /**
   * 处理单条数据
   *
   * @param dataProvider
   * @param protocolDataConverter
   * @param checker
   * @param dataConverter
   * @param writer
   * @param protocolConvertSuccess
   * @param protocolConvertError
   * @param checkerSuccess
   * @param checkerError
   * @param dataConvertSuccess
   * @param dataConvertError
   * @param writeSuccess
   * @param writeError
   * @param tRecordConvertSuccess
   * @param tRecord
   * @return
   */
  protected void doForEach(IDataSource<T, S> dataProvider, IProtocolDataConverter<T, P, S> protocolDataConverter, IProtocolDataChecker checker, IDataConverter<P, R, S> dataConverter, IDataSink<R, S> writer, List<P> protocolConvertSuccess, List<ErrorRecord<T, Object>> protocolConvertError, List<P> checkerSuccess, List<ErrorRecord<P, IProtocolDataChecker.CheckResult>> checkerError, List<R> dataConvertSuccess, List<ErrorRecord<P, Object>> dataConvertError, List<R> writeSuccess, List<ErrorRecord<R, IDataSink.WriterResult>> writeError, List<T> tRecordConvertSuccess, T tRecord) {
    //协议转换
      long protocolConvertUseTimeStart = System.currentTimeMillis();
    P protocolData = protocolConvert(protocolDataConverter, protocolConvertSuccess, protocolConvertError, tRecord);
      taskMonitor.protocolConvertUseTimeIncrease(System.currentTimeMillis() - protocolConvertUseTimeStart);
    if (protocolData == null) {
      //协议转换失败
      return;
    }
    if (checker != null) {
      //数据检查
      long checkerUseTimeStart = System.currentTimeMillis();
      AbstractDataChecker.CheckResult checkResult = protocolDataCheck(checker, checkerSuccess, checkerError, protocolData);
      taskMonitor.checkerUseTimeIncrease(System.currentTimeMillis() - checkerUseTimeStart);
      if (!checkResult.isPass()) {
        //数据检查未通过
        return;
      }
    } else {
      log.debug("checker没有配置");
      checkerSuccess.add(protocolData);
    }
    //数据转换
    long dataConvertUseTimeStart = System.currentTimeMillis();
    List<R> records = dataConvert(dataConverter, dataConvertSuccess, dataConvertError, protocolData);
    taskMonitor.dataConvertUseTimeIncrease(System.currentTimeMillis() - dataConvertUseTimeStart);
    if (records == null) {
      //数据转换失败
      log.error("数据转换失败!");
      return;
    }
    tRecordConvertSuccess.add(tRecord);
    //单条数据写入
    if (!settingContext.isBatchWrite()) {
      long writeUseTimeStart = System.currentTimeMillis();
      singleRecordWrite(dataProvider, writer, writeSuccess, writeError, tRecord, records);
      taskMonitor.writeUseTimeIncrease(System.currentTimeMillis() - writeUseTimeStart);
    }
    return;
  }

  /**
   * 是否存在reset exception
   *
   * @param errorRecordList
   * @return
   */
  private boolean isContainRestException(List<?>... errorRecordList) {
    for (List<?> errorRecords : errorRecordList) {
      for (Object errorRecord : errorRecords) {
        ErrorRecord errorRecord1 = (ErrorRecord) errorRecord;
        if (errorRecord1.getUnknownException() != null && errorRecord1.getUnknownException() instanceof ResetException) {
          return true;
        }
      }
    }
    return false;
  }

  private void batchWrite(IDataSource<T, S> dataSource, IDataSink<R, S> sink, List<R> dataConvertSuccess, List<R> writeSuccess, List<ErrorRecord<R, IDataSink.WriterResult>> writeError, List<T> tRecordConvertSuccess) {
    try {
      IDataSink.WriterResult writeResult = sink.write(dataConvertSuccess, settingContext);
      if (writeResult.isSuccess()) {
        writeSuccess.addAll(dataConvertSuccess);
      } else {
        List<ErrorRecord<R, IDataSink.WriterResult>> writeErrorRecords = dataConvertSuccess.stream().map(record -> {
          return ErrorRecord.<R, IDataSink.WriterResult>builder()
            .record(record)
            .knownError(writeResult).build();
        }).collect(Collectors.toList());
        writeError.addAll(writeErrorRecords);
      }
      if (writeResult.isCommit()) {
        try {
          //防止commit异常
          dataSource.commit(tRecordConvertSuccess, settingContext);
        } catch (Exception e) {
          throw new ResetException("commit异常", e);
        }
      } else {
        tRecordConvertSuccess.forEach(record -> {
          rollbackUnit.put(dataSource.rollbackTransactionUnit(record), true);
        });
        dataSource.rollback(tRecordConvertSuccess, settingContext);
      }
    } catch (Exception e) {
      log.error("批量写入异常！", e);
      if (!(e instanceof ResetException)) {
        dataSource.commit(tRecordConvertSuccess, settingContext);
      } else {
        //这里认为回滚不会有异常情况
        tRecordConvertSuccess.forEach(record -> {
          rollbackUnit.put(dataSource.rollbackTransactionUnit(record), true);
        });
        dataSource.rollback(tRecordConvertSuccess, settingContext);
      }
      List<ErrorRecord<R, IDataSink.WriterResult>> writeErrorRecords = dataConvertSuccess.stream().map(record -> {
        return ErrorRecord.<R, IDataSink.WriterResult>builder()
          .record(record)
          .unknownException(e).build();
      }).collect(Collectors.toList());
      writeError.addAll(writeErrorRecords);
    }
  }

  private void singleRecordWrite(IDataSource<T, S> dataSource, IDataSink<R, S> sink, List<R> writeSuccess, List<ErrorRecord<R, IDataSink.WriterResult>> writeError, T tRecord, List<R> records) {
    //单条写入
    try {

      IDataSink.WriterResult writerResult;
      if (records.size() == 1) {
        writerResult = sink.write(records.get(0), settingContext);
      } else {
        writerResult = sink.write(records, settingContext);
      }
      if (writerResult.isSuccess()) {
        writeSuccess.addAll(records);
      } else {
        for (R record : records) {
          writeError.add(ErrorRecord.<R, IDataSink.WriterResult>builder()
            .record(record)
            .knownError(writerResult).build());
        }
      }
      if (writerResult.isCommit()) {
        //数据提交
        try {
          //防止commit异常
          if (!settingContext.isBatchCommit())
            dataSource.commit(tRecord, settingContext);
        } catch (Exception e) {
          throw new ResetException("commit异常", e);
        }
      } else {
        //数据回滚
        //这里认为回滚不会有异常情况
        rollbackUnit.put(dataSource.rollbackTransactionUnit(tRecord), true);
        dataSource.rollback(tRecord, settingContext);
      }

    } catch (Exception e) {
      log.error("单条写入异常", e);
      if (!(e instanceof ResetException)) {
        //提交数据
        if (!settingContext.isBatchCommit())
          dataSource.commit(tRecord, settingContext);
      } else {
        //回滚数据
        rollbackUnit.put(dataSource.rollbackTransactionUnit(tRecord), true);
        dataSource.rollback(tRecord, settingContext);
      }
      for (R record : records) {
        writeError.add(ErrorRecord.<R, IDataSink.WriterResult>builder()
          .record(record)
          .unknownException(e).build());
      }
    }
  }

  private List<R> dataConvert(IDataConverter<P, R, S> dataConverter, List<R> dataConvertSuccess, List<ErrorRecord<P, Object>> dataConvertError, P protocolData) {
    try {
      Object record = dataConverter.convert(protocolData, settingContext);
      if (record instanceof List) {
        dataConvertSuccess.addAll((List) record);
        return (List) record;
      }
      dataConvertSuccess.add((R) record);
      return Collections.singletonList((R) record);
    } catch (Exception e) {
      log.error("数据转换异常！", e);
      dataConvertError.add(ErrorRecord.<P, Object>builder()
        .record(protocolData)
        .unknownException(e)
        .build());
    }
    return null;
  }

  private AbstractDataChecker.CheckResult protocolDataCheck(IProtocolDataChecker checker, List<P> checkerSuccess, List<ErrorRecord<P, AbstractDataChecker.CheckResult>> checkerError, P protocolData) {
    try {
      AbstractDataChecker.CheckResult checkResult = checker.check(null, protocolData, settingContext);
      if (checkResult.isPass()) {
        checkerSuccess.add(protocolData);
      } else {
        checkerError.add(ErrorRecord.<P, AbstractDataChecker.CheckResult>builder()
          .record(protocolData)
          .knownError(checkResult)
          .build());
      }
      return checkResult;
    } catch (Exception e) {
      log.error("校验发生异常！", e);
      checkerError.add(ErrorRecord.<P, AbstractDataChecker.CheckResult>builder()
        .record(protocolData)
        .unknownException(e)
        .build());
    }
    return null;
  }

  private P protocolConvert(IProtocolDataConverter<T, P, S> protocolDataConverter, List<P> protocolConvertSuccess, List<ErrorRecord<T, Object>> protocolConvertError, T tRecord) {
    try {
      P protocolData = protocolDataConverter.convert(tRecord, settingContext);
      protocolConvertSuccess.add(protocolData);
      return protocolData;
    } catch (Exception e) {
      log.error("协议转换失败！", e);
      protocolConvertError.add(ErrorRecord.<T, Object>builder()
        .record(tRecord)
        .unknownException(e)
        .build());
    }
    return null;
  }

  @Override
  public void close() throws IOException {
    run = false;
    taskMonitor.setRun(run);
    safeListenerRun(() -> executorListeners.stream().forEach(listener -> listener.onClose(this, dataPipeline, writeCount.longValue(), run)));
    if(!isAlive()) {
        //创建状态下的任务主动调用onSucccessEnd
        safeListenerRun(() -> executorListeners.stream().forEach(listener -> listener.onSucccessEnd(this, dataPipeline, writeCount.longValue(), run)));
    }
    while (!end) {
      //等待工作线程结束
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        log.error("线程被打断！");
      }
    }
    taskStorage.close();
    dataPipeline.close();
    if(forkJoinPool!=null)
        forkJoinPool.shutdownNow();
  }

  public void setTaskMonitorMBean(TaskExecutorMonitor taskMonitor) {
    this.taskMonitor = taskMonitor;
  }

  public TaskExecutorMonitor getTaskMonitor() {
    return taskMonitor;
  }

    public void setTaskStorage(ITaskStorage taskStorage) {
        this.taskStorage = taskStorage;
    }
}